其他
打通实时流处理log4j-flume-kafka-structured-streaming
模拟产生log4j日志
jar包依赖 pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 | <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.8.0</version> </dependency> |
java代码 LoggerGenerator.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class LoggerGenerator { private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName()); public static void main(String[] args) throws Exception{ int index = 0; while(true) { Thread.sleep(1000); logger.info("value : " + index++); } // $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181 } } |
log4j.properties配置
1 2 3 4 5 6 7 8 9 10 11 12 | log4j.rootLogger=INFO,stdout,flume log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.target = System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = 127.0.0.1 log4j.appender.flume.Port = 44444 log4j.appender.flume.UnsafeMode = true |
kafka broker启动
提前创建好topic【不是必须的】
flume-ng启动后,启动一个kafka console consulmer观察数据
1 2 3 | $ kafka-server-start.sh $KAFKA_HOME/config/server.properties $ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic |
flume-ng配置和启动
前面文章用过的avro-memory-kafka.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # avro-memory-kafka.conf # Name the components on this agent avro-memory-kafka.sources = avro-source avro-memory-kafka.sinks = kafka-sink avro-memory-kafka.channels = momory-channel # Describe/configure the source avro-memory-kafka.sources.avro-source.type = avro avro-memory-kafka.sources.avro-source.bind = 127.0.0.1 avro-memory-kafka.sources.avro-source.port = 44444 # Describe the sink # Must be set to org.apache.flume.sink.kafka.KafkaSin avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092 avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic # Use a channel which buffers events in memory avro-memory-kafka.channels.momory-channel.type = memory avro-memory-kafka.channels.momory-channel.capacity = 1000 avro-memory-kafka.channels.momory-channel.transactionCapacity = 100 # Bind the source and sink to the channel avro-memory-kafka.sources.avro-source.channels = momory-channel avro-memory-kafka.sinks.kafka-sink.channel = momory-channel |
启动flume-ng
1 2 3 | $ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 & $ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer |
spark structured streaming实时流处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | topic = 'kafka_streaming_topic' brokers = "127.0.0.1:9092" spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate() lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)") # 自定义处理传输的数据-比如JSON串 words = lines.select( explode( split(lines.value, ' ') ).alias('word') ) word_counts = words.groupBy('word').count() query = word_counts.writeStream.outputMode("complete").format("console").start() query.awaitTermination() |
文章不错?点个【在看】吧! 👇